package com.tca.common.learning.webflux.reactor.sink;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author zhoua
 * @date 2022/1/8 16:30
 */
public class CreateTest {

    public static class MyEventSource {

        private List<MyEventListener> listeners;

        public MyEventSource() {
            this.listeners = new ArrayList<>();
        }

        public void register(MyEventListener listener) {
            listeners.add(listener);
        }

        public void newEvent(MyEvent event) {
            for (MyEventListener listener : listeners) {
                listener.onNewEvent(event);
            }
        }

        public void eventStopped() {
            for (MyEventListener listener :
                    listeners) {
                listener.onEventStopped();
            }
        }


    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class MyEvent {
        private Date timeStamp;
        private String message;
    }

    public static interface MyEventListener {
        /**
         * 监听事件
         * @param event
         */
        void onNewEvent(MyEvent event);

        /**
         * 事件停止
         */
        void onEventStopped();
    }

    public static void main(String[] args) throws InterruptedException {

        create();

    }

    private static void create() throws InterruptedException {
        final int total = 10;
        MyEventSource eventSource = new MyEventSource();
        Flux.create(sink -> {
                    eventSource.register(new MyEventListener() {
                        @Override
                        public void onNewEvent(MyEvent event) {
                            // 监听器在收到事件回调的时候通过sink将事件再发出
                            sink.next(event);
                        }

                        @Override
                        public void onEventStopped() {
                            sink.complete();
                        }
                    });
                }
        ).subscribe(System.out::println);

        for (int i = 0; i < total; i++) {
            Random random = new Random();
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            eventSource.newEvent(new MyEvent(new Date(), "Event-" + i));
        }
        eventSource.eventStopped();
    }
}



